在取用個資料的時候,都是一個一個執行,分散各地,為了方便需要整合;訊號燈也是如此,都放在個別的jupyter notebook中,不適合做成各自獨立的訊號燈。因此這邊會把他們模組化。
首先登場的是取股票資料。
本日程式碼使用:stock_transaction.py
部分資料已經把它模組化,像是取得三大法人期選交易狀況(Day6)、加權指數(Day10),但是Day4的程式還沒有,
可參考Day4的程式碼。這邊把功能抽離出來,變成一個class
:stock_transaction
。並且把要取得資料的連結先設定在__init__
中。
def __init__(self) -> None:
self.title = "盤後資訊 > 個股日成交資訊"
self.url = (
"http://www.twse.com.tw/exchangeReport/STOCK_DAY_ALL?response=open_data"
)
self.df = None # 把資料從csv轉乘datframe
self.trade_date = "2021-1-1" # 交易日
_get_and_set_df_data
、_create_new_header
跟之前相同,需要一個從API取得資料並放在ataframe
的功能_get_and_set_df_data
,以及中文欄位與資料庫欄位的轉換功能_create_new_header
。
其中在_get_and_set_df_data
特別要把日期給改好,直接從提供的檔名當作日期,取代之前的「執行當日」為日期,因為今天還沒有開盤或是資料仍是舊的,這個當下的日期寫入資料庫中,就是錯誤的。因此可從headers
中取得檔名(STOCK_DAY_ALL_20210924.csv
)並且取得日期(20210924
)。
csv = requests.get(self.url)
df = pandas.read_csv(StringIO(csv.text)) # 有header
# print(df) # debug
self.df = df
# 從檔名取得日期,檔名:STOCK_DAY_ALL_20210924.csv
trade_date_raw = csv.headers.get("Content-Disposition")[-13:-5]
self.trade_date = (
f"{trade_date_raw[:4]}-{trade_date_raw[4:6]}-{trade_date_raw[6:]}"
)
_insert_mysql()
當然還要匯入資料庫的功能_insert_mysql
,原本的now
參數本來是當下日期,改成從上面說明取得的日期self.trade_date
。
執得注意的是,這邊也是沿用之前建立的db_connect
這個模組來連線MySQL。
def _insert_mysql(self) -> bool:
try:
new_headers = self._create_new_header(self.df.columns)
df = self.df[1:] # 拿掉第一行的資料
df.columns = new_headers # 設定資料欄位的名稱
print(df)
counter = 0 # 記錄欲新增數量
# 建立connection物件
my_connt_obj = db_connect.mysql_connect()
conn = my_connt_obj.connect()
with conn.cursor() as cursor:
now = self.trade_date
# 新增SQL語法
for _, row in df.iterrows():
try:
cmd = """INSERT IGNORE INTO DailyPrice
(StockID, Symbol, TradeDate, OpenPrice, HighPrice,
LowPrice, ClosePrice, Volumn)
values(%s,%s,%s,%s,%s,%s,%s,%s);"""
cursor.execute(
cmd,
(
None,
row.stock_symbol,
now,
row.open if pandas.notnull(row.open) else 0,
row.high if pandas.notnull(row.high) else 0,
row.low if pandas.notnull(row.low) else 0,
row.close if pandas.notnull(row.close) else 0,
row.volume if pandas.notnull(row.volume) else 0,
),
)
conn.commit()
counter += 1
except Exception as e:
print(e)
get_and_save()
這邊就很簡單的呼叫使用,如果沒有這個話,就要自行呼叫兩個function
出來
def get_and_save(self, url=None):
"""Get today transaction data and save into MySQL.
Args:
param1 (str): 資料的url
Returns:
bool: 回傳結果. True 表示儲存成功,False 表示沒有儲存至資料庫
"""
r = self._get_and_set_df_data(url)
if r:
r = self._insert_mysql()
else:
return False
return r
這邊有稍微把一些不合理的地方調整一下,也讓整個程式稍微乾淨一點,並讓程式比較獨立,成為一個模組。